Module redvox.tests.more tests

Expand source code
import timeit
import os.path

import numpy as np
import pprint
import pickle
import psutil
import pandas as pd
import redpandas.redpd_df

import redpandas.redpd_filter as rpd_filter
import redpandas.redpd_xcorr as rpd_xcorr
import redpandas.redpd_plot.wiggles as rpd_plt

import redpandas.redpd_tfr as rpd_tfr
import redpandas.redpd_plot.mesh as rpd_plt_tfr

from redvox.common.data_window import DataWindow, DataWindowConfig
import redvox.common.date_time_utils as dt
import redvox.settings as settings
from redvox.common.event_stream import EventStreams
from redvox.common.api_reader import ApiReader
import redvox.api1000.wrapped_redvox_packet.wrapped_packet as wp


def main():
    # compressed data to DW is 4.68-4.32 times
    # up to 6 times for 8k sample rate
    # higher sample rate means larger multiplication

    # 2.5 mins with disk write to temp (lower RAM Usage)
    # 1.5 mins with mem write (high ram usage)

    settings.set_parallelism_enabled(False)

    print("parallel: ", settings.is_parallelism_enabled())

    save_path = "/Users/tyler/Documents/ml_export"
    # path = "/Users/tyler/Documents/pyarrowreadertest/inl_test"
    # path = "/Users/tyler/Documents/snl_21_9_27"
    # path = "/Users/tyler/Documents/tss_test"
    path = "/Users/tyler/Documents/ml_export/test_wamv"
    out_pkl = os.path.join(path, "out22.pkl")
    config = DataWindowConfig(
        path,
        True,
        # use_model_correction=False,
        # station_ids=["1637610022"]
        # station_ids=["1637610022"]
        # station_ids=["1637681007"]
        # station_ids=["1637665009"]
        # start_datetime=dt.datetime_from_epoch_seconds_utc(1661203100),
        # end_datetime=dt.datetime_from_epoch_seconds_utc(1661203200)
    )

    s = timeit.default_timer()
    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/pyarrowreadertest/large_test/large_test_lz4.pkl.lz4")
    # lz4 = 1542448.119 KB uncompressed
    # parquet = 1565174.924 KB uncompressed
    sf_test = DataWindow("test", config=config, out_type="NONE", make_runme=False, debug=False)
    # sf_test.save()
    # sf_test = DataWindow.load(os.path.join(save_path, "test.json"))
    e = timeit.default_timer()
    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/duplicate_test/dw_1641329393000278_3.pkl.lz4")
    # print("audio_rate", sf_test.first_station().audio_sample_rate_nominal_hz())
    print("load dw", e - s)
    for t in sf_test.stations():
        print(t.id())
        for i in range(3):
            print(t.event_data().ml_data.windows[i].retain_top(3))

    exit(1)

    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/perf_tests/dw_1661472735001221_1.pkl.lz4")

    # print(sf_test.first_station().audio_sensor().get_microphone_data()[-50:-1])

    import matplotlib.pyplot as plt

    print("Plotting data")
    plts = plt.subplots(1, 1, figsize=(8, 6), sharex=True)
    fig = plts[0]
    axes = plts[1]
    axes.set_title("Audio Data")

    cur_i = 0

    for s in sf_test.stations():
        print(s.id())
        # if s.id() == "1637610022":
        #     ltcs = s.timesync_data().best_latency_per_exchange()
        #     new_best_latency = np.min([n for n in ltcs if n > 500])
        #     s.timesync_data()._best_latency = new_best_latency
        #     s.timesync_data()._best_latency_index = int(np.where(ltcs == new_best_latency)[0][0])
        #     # s.timesync_data()._best_offset = s.timesync_data().offsets()[1][s.timesync_data().best_latency_index()]
        #     s.timesync_data()._best_offset = -50464006.0
        # s._correct_timestamps = True
        # s.update_timestamps()

        print(s.first_data_timestamp())
        print(s.last_data_timestamp())

        # axes.plot(s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
        #                  s.audio_sensor().get_microphone_data())

        with open(out_pkl, "wb") as outs:
            pickle.dump(
                [
                    s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
                    s.audio_sensor().get_microphone_data(),
                ],
                outs,
            )
        axes.set_ylabel(s.id())
        cur_i += 1
        # tsync = s.timesync_data()
        #
        # best_exchanges = tsync.sync_exchanges()
        #
        # print("server:")
        # print(best_exchanges[0][tsync.best_latency_index()])
        # print(best_exchanges[1][tsync.best_latency_index()])
        # print(best_exchanges[2][tsync.best_latency_index()])
        # print("device:")
        # print(best_exchanges[3][tsync.best_latency_index()])
        # print(best_exchanges[4][tsync.best_latency_index()])
        # print(best_exchanges[5][tsync.best_latency_index()])
        # print(s.location_sensor().get_location_provider_data())
        # print(s.location_sensor().data_timestamps())
        # print(s.location_sensor().get_gps_timestamps_data())
    plt.xlabel("Time")
    plt.show()
    exit(1)

    # asdfasdf = [1.0000014 ** n for n in range(9999999)]
    # print(asdfasdf.__sizeof__())

    EPISODE_START_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132940)
    EPISODE_END_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132984)
    INPUT_DIR = "/Users/tyler/Downloads/data"
    # # Absolute path for output pickle and parquet files
    RPD_DIR = "rpd_files_sdk31"
    OUTPUT_DIR = os.path.join(INPUT_DIR, RPD_DIR)

    dw_config = DataWindowConfig(
        input_dir=INPUT_DIR,
        start_datetime=EPISODE_START_DATETIME,
        end_datetime=EPISODE_END_DATETIME,
        start_buffer_td=dt.timedelta(minutes=3),
        station_ids=["0000000021"],
    )
    # station_ids=['0000000011', '0000000012', '0000000013', '0000000021',
    #              '0000000022', '0000000023', '0000000024'])

    # dw = DataWindow(event_name="rdvx_flight2", config=dw_config, debug=False,
    #                 out_dir=OUTPUT_DIR, out_type="NONE")

    # dw.save()
    # asdahfs = dw.load(os.path.join(OUTPUT_DIR, "rdvx_flight2.json"))
    # print(asdahfs.pretty())
    exit(1)


if __name__ == "__main__":
    main()

Functions

def main()
Expand source code
def main():
    # compressed data to DW is 4.68-4.32 times
    # up to 6 times for 8k sample rate
    # higher sample rate means larger multiplication

    # 2.5 mins with disk write to temp (lower RAM Usage)
    # 1.5 mins with mem write (high ram usage)

    settings.set_parallelism_enabled(False)

    print("parallel: ", settings.is_parallelism_enabled())

    save_path = "/Users/tyler/Documents/ml_export"
    # path = "/Users/tyler/Documents/pyarrowreadertest/inl_test"
    # path = "/Users/tyler/Documents/snl_21_9_27"
    # path = "/Users/tyler/Documents/tss_test"
    path = "/Users/tyler/Documents/ml_export/test_wamv"
    out_pkl = os.path.join(path, "out22.pkl")
    config = DataWindowConfig(
        path,
        True,
        # use_model_correction=False,
        # station_ids=["1637610022"]
        # station_ids=["1637610022"]
        # station_ids=["1637681007"]
        # station_ids=["1637665009"]
        # start_datetime=dt.datetime_from_epoch_seconds_utc(1661203100),
        # end_datetime=dt.datetime_from_epoch_seconds_utc(1661203200)
    )

    s = timeit.default_timer()
    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/pyarrowreadertest/large_test/large_test_lz4.pkl.lz4")
    # lz4 = 1542448.119 KB uncompressed
    # parquet = 1565174.924 KB uncompressed
    sf_test = DataWindow("test", config=config, out_type="NONE", make_runme=False, debug=False)
    # sf_test.save()
    # sf_test = DataWindow.load(os.path.join(save_path, "test.json"))
    e = timeit.default_timer()
    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/duplicate_test/dw_1641329393000278_3.pkl.lz4")
    # print("audio_rate", sf_test.first_station().audio_sample_rate_nominal_hz())
    print("load dw", e - s)
    for t in sf_test.stations():
        print(t.id())
        for i in range(3):
            print(t.event_data().ml_data.windows[i].retain_top(3))

    exit(1)

    # sf_test = DataWindow.deserialize("/Users/tyler/Documents/perf_tests/dw_1661472735001221_1.pkl.lz4")

    # print(sf_test.first_station().audio_sensor().get_microphone_data()[-50:-1])

    import matplotlib.pyplot as plt

    print("Plotting data")
    plts = plt.subplots(1, 1, figsize=(8, 6), sharex=True)
    fig = plts[0]
    axes = plts[1]
    axes.set_title("Audio Data")

    cur_i = 0

    for s in sf_test.stations():
        print(s.id())
        # if s.id() == "1637610022":
        #     ltcs = s.timesync_data().best_latency_per_exchange()
        #     new_best_latency = np.min([n for n in ltcs if n > 500])
        #     s.timesync_data()._best_latency = new_best_latency
        #     s.timesync_data()._best_latency_index = int(np.where(ltcs == new_best_latency)[0][0])
        #     # s.timesync_data()._best_offset = s.timesync_data().offsets()[1][s.timesync_data().best_latency_index()]
        #     s.timesync_data()._best_offset = -50464006.0
        # s._correct_timestamps = True
        # s.update_timestamps()

        print(s.first_data_timestamp())
        print(s.last_data_timestamp())

        # axes.plot(s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
        #                  s.audio_sensor().get_microphone_data())

        with open(out_pkl, "wb") as outs:
            pickle.dump(
                [
                    s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
                    s.audio_sensor().get_microphone_data(),
                ],
                outs,
            )
        axes.set_ylabel(s.id())
        cur_i += 1
        # tsync = s.timesync_data()
        #
        # best_exchanges = tsync.sync_exchanges()
        #
        # print("server:")
        # print(best_exchanges[0][tsync.best_latency_index()])
        # print(best_exchanges[1][tsync.best_latency_index()])
        # print(best_exchanges[2][tsync.best_latency_index()])
        # print("device:")
        # print(best_exchanges[3][tsync.best_latency_index()])
        # print(best_exchanges[4][tsync.best_latency_index()])
        # print(best_exchanges[5][tsync.best_latency_index()])
        # print(s.location_sensor().get_location_provider_data())
        # print(s.location_sensor().data_timestamps())
        # print(s.location_sensor().get_gps_timestamps_data())
    plt.xlabel("Time")
    plt.show()
    exit(1)

    # asdfasdf = [1.0000014 ** n for n in range(9999999)]
    # print(asdfasdf.__sizeof__())

    EPISODE_START_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132940)
    EPISODE_END_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132984)
    INPUT_DIR = "/Users/tyler/Downloads/data"
    # # Absolute path for output pickle and parquet files
    RPD_DIR = "rpd_files_sdk31"
    OUTPUT_DIR = os.path.join(INPUT_DIR, RPD_DIR)

    dw_config = DataWindowConfig(
        input_dir=INPUT_DIR,
        start_datetime=EPISODE_START_DATETIME,
        end_datetime=EPISODE_END_DATETIME,
        start_buffer_td=dt.timedelta(minutes=3),
        station_ids=["0000000021"],
    )
    # station_ids=['0000000011', '0000000012', '0000000013', '0000000021',
    #              '0000000022', '0000000023', '0000000024'])

    # dw = DataWindow(event_name="rdvx_flight2", config=dw_config, debug=False,
    #                 out_dir=OUTPUT_DIR, out_type="NONE")

    # dw.save()
    # asdahfs = dw.load(os.path.join(OUTPUT_DIR, "rdvx_flight2.json"))
    # print(asdahfs.pretty())
    exit(1)